{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "likely-coating",
   "metadata": {},
   "source": [
    "# Model Gateway Tests \n",
    "\n",
    "The model gateway provides a kafka to http bridge."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "hindu-scroll",
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install kafka-python"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "cheap-semiconductor",
   "metadata": {},
   "outputs": [],
   "source": [
    "from kafka import KafkaProducer\n",
    "def produceIrisJson():\n",
    "    key = str.encode(''.join(random.choices(string.ascii_uppercase + string.digits, k=10)))\n",
    "    value='{\"inputs\": [{\"name\": \"predict\", \"shape\": [1, 4], \"datatype\": \"FP32\", \"data\": [[1, 2, 3, 4]]}]}'\n",
    "    b=str.encode(value)\n",
    "    producer = KafkaProducer(bootstrap_servers=BROKER_IP+':9092')\n",
    "    producer.send(\"seldon.default.model.iris.inputs\",value=b,key=key)\n",
    "    producer.flush()\n",
    "    producer.close()\n",
    "    return key"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "brief-jewelry",
   "metadata": {},
   "outputs": [],
   "source": [
    "from kafka import KafkaConsumer\n",
    "import random\n",
    "import string\n",
    "\n",
    "def consumeIrisJson(key):\n",
    "    group_id = ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))\n",
    "    consumer = KafkaConsumer(\n",
    "         bootstrap_servers=[BROKER_IP+':9092'],\n",
    "         group_id=group_id,\n",
    "         auto_offset_reset=\"earliest\",\n",
    "    )\n",
    "    consumer.subscribe(['seldon.default.model.iris.outputs'])\n",
    "    \n",
    "    try:\n",
    "        for msg in consumer:\n",
    "            if msg.key == key:\n",
    "                print(msg.value)\n",
    "                break\n",
    "    finally:\n",
    "        consumer.close()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "raising-court",
   "metadata": {},
   "outputs": [],
   "source": [
    "import v2_dataplane_pb2 as v2\n",
    "from google.protobuf.json_format import Parse\n",
    "\n",
    "def produceIrisGrpc():\n",
    "    key = str.encode(''.join(random.choices(string.ascii_uppercase + string.digits, k=10)))\n",
    "    data = '{\"model_name\":\"iris\",\"inputs\":[{\"name\":\"input\",\"contents\":{\"fp32_contents\":[1,2,3,4]},\"datatype\":\"FP32\",\"shape\":[1,4]}]}'\n",
    "    msg = Parse(data,v2.ModelInferRequest())\n",
    "    b = msg.SerializeToString()\n",
    "    producer = KafkaProducer(bootstrap_servers=BROKER_IP+':9092')\n",
    "    producer.send(\"seldon.default.model.iris.inputs\",value=b,key=key)\n",
    "    producer.flush()\n",
    "    producer.close()\n",
    "    return key"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "seventh-archives",
   "metadata": {},
   "outputs": [],
   "source": [
    "from kafka import KafkaConsumer\n",
    "import random\n",
    "import string\n",
    "\n",
    "def consumeIrisGrpc(key):\n",
    "    group_id = ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))\n",
    "    consumer = KafkaConsumer(\n",
    "         bootstrap_servers=[BROKER_IP+':9092'],\n",
    "         group_id=group_id,\n",
    "         auto_offset_reset=\"earliest\",\n",
    "    )\n",
    "    consumer.subscribe(['seldon.default.model.iris.outputs'])\n",
    "    \n",
    "    try:\n",
    "        for msg in consumer:\n",
    "            if msg.key == key:\n",
    "                resp = v2.ModelInferResponse()\n",
    "                resp.ParseFromString(msg.value)\n",
    "                print(resp)\n",
    "                break\n",
    "   \n",
    "    finally:\n",
    "        consumer.close()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "classified-circular",
   "metadata": {},
   "source": [
    "# Model Gateway Local Test\n",
    "\n",
    "\n",
    "## Setup\n",
    "\n",
    "\n",
    "* `make start-all-mlserver`\n",
    "* Update the config file `config/agent.yaml` locally \n",
    "```\n",
    "kafka:\n",
    "  active: true\n",
    "  broker: \"kafka:29092\"\n",
    "```\n",
    "\n",
    "Note: Seems not to presently work with confluent-kafka with Docker compose kraft (no zookeeper) Kafka.\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "gross-consortium",
   "metadata": {},
   "outputs": [],
   "source": [
    "BROKER_IP=\"0.0.0.0\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "quarterly-thesis",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{\r\n",
      "  \r\n",
      "}\r\n"
     ]
    }
   ],
   "source": [
    "!grpcurl -d '{\"model\":{ \\\n",
    "              \"meta\":{\"name\":\"iris\"},\\\n",
    "              \"modelSpec\":{\"uri\":\"gs://seldon-models/mlserver/iris\",\\\n",
    "                           \"requirements\":[\"sklearn\"],\\\n",
    "                           \"memoryBytes\":500},\\\n",
    "              \"deploymentSpec\":{\"replicas\":1}\\\n",
    "              }}' \\\n",
    "         -plaintext \\\n",
    "         -import-path ../../apis \\\n",
    "         -proto ../../apis/mlops/scheduler/scheduler.proto  0.0.0.0:9004 seldon.mlops.scheduler.Scheduler/LoadModel"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "agricultural-coupon",
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{\r\n",
      "  \"serverName\": \"mlserver\",\r\n",
      "  \"resources\": [\r\n",
      "    {\r\n",
      "      \"totalMemoryBytes\": \"1000000\",\r\n",
      "      \"availableMemoryBytes\": \"1199500\",\r\n",
      "      \"numLoadedModels\": 1,\r\n",
      "      \"overCommitPercentage\": 20\r\n",
      "    }\r\n",
      "  ],\r\n",
      "  \"expectedReplicas\": -1,\r\n",
      "  \"availableReplicas\": 1,\r\n",
      "  \"numLoadedModelReplicas\": 1\r\n",
      "}\r\n"
     ]
    }
   ],
   "source": [
    "!grpcurl -d '{\"name\":\"mlserver\"}' \\\n",
    "         -plaintext \\\n",
    "         -proto ../apis/mlops/scheduler/scheduler.proto  0.0.0.0:9004 seldon.mlops.scheduler.Scheduler/ServerStatus"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "lyric-logan",
   "metadata": {
    "scrolled": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "*   Trying 0.0.0.0...\n",
      "* Connected to 0.0.0.0 (127.0.0.1) port 9000 (#0)\n",
      "> POST /v2/models/iris/infer HTTP/1.1\n",
      "> Host: 0.0.0.0:9000\n",
      "> User-Agent: curl/7.47.0\n",
      "> Accept: */*\n",
      "> Content-Type: application/json\n",
      "> seldon-model: iris\n",
      "> Content-Length: 94\n",
      "> \n",
      "* upload completely sent off: 94 out of 94 bytes\n",
      "< HTTP/1.1 200 OK\n",
      "< content-length: 196\n",
      "< content-type: application/json\n",
      "< date: Sat, 26 Mar 2022 15:22:52 GMT\n",
      "< server: envoy\n",
      "< x-envoy-upstream-service-time: 1186\n",
      "< seldon-route: iris_1\n",
      "< \n",
      "* Connection #0 to host 0.0.0.0 left intact\n",
      "{\"model_name\":\"iris_1\",\"model_version\":\"1\",\"id\":\"75077c6a-398e-4cf3-96a7-11f537049515\",\"parameters\":null,\"outputs\":[{\"name\":\"predict\",\"shape\":[1],\"datatype\":\"INT64\",\"parameters\":null,\"data\":[2]}]}"
     ]
    }
   ],
   "source": [
    "!curl -v http://0.0.0.0:9000/v2/models/iris/infer -H \"Content-Type: application/json\" -H \"seldon-model: iris\"\\\n",
    "        -d '{\"inputs\": [{\"name\": \"predict\", \"shape\": [1, 4], \"datatype\": \"FP32\", \"data\": [[1, 2, 3, 4]]}]}'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "celtic-disclaimer",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "b'{\"model_name\":\"iris_1\",\"model_version\":\"1\",\"id\":\"86d16f98-09c5-4572-b792-127008dcd712\",\"parameters\":null,\"outputs\":[{\"name\":\"predict\",\"shape\":[1],\"datatype\":\"INT64\",\"parameters\":null,\"data\":[2]}]}'\n"
     ]
    }
   ],
   "source": [
    "key = produceIrisJson()\n",
    "consumeIrisJson(key)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "allied-checklist",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "model_name: \"iris_1\"\n",
      "model_version: \"1\"\n",
      "outputs {\n",
      "  name: \"predict\"\n",
      "  datatype: \"INT64\"\n",
      "  shape: 1\n",
      "  contents {\n",
      "    int64_contents: 2\n",
      "  }\n",
      "}\n",
      "\n"
     ]
    }
   ],
   "source": [
    "key = produceIrisGrpc()\n",
    "consumeIrisGrpc(key)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "binary-symphony",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{\r\n",
      "  \r\n",
      "}\r\n"
     ]
    }
   ],
   "source": [
    "!grpcurl -d '{\"model\":{\"name\":\"iris\"}}' \\\n",
    "         -plaintext \\\n",
    "         -import-path ../../apis \\\n",
    "         -proto ../../apis/mlops/scheduler/scheduler.proto  0.0.0.0:9004 seldon.mlops.scheduler.Scheduler/UnloadModel"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "rough-drilling",
   "metadata": {},
   "source": [
    "# Model Gateway K8S Test\n",
    "\n",
    "\n",
    "## Setup\n",
    "\n",
    "\n",
    "* Install on k8s\n",
    "* Update the config map seldon-agent \n",
    "```yaml\n",
    "kafka:\n",
    "    active: true\n",
    "    broker: \"seldon-kafka-plain-bootstrap.kafka:9092\"\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "public-semester",
   "metadata": {},
   "outputs": [],
   "source": [
    "BROKER_IP=!kubectl get svc seldon-kafka-plain-bootstrap -n kafka -o jsonpath='{.status.loadBalancer.ingress[0].ip}'\n",
    "BROKER_IP=BROKER_IP[0]\n",
    "BROKER_IP"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "satisfactory-pharmacology",
   "metadata": {},
   "outputs": [],
   "source": [
    "SCHEDULER_IP=!kubectl get svc seldon-scheduler -n seldon-mesh -o jsonpath='{.status.loadBalancer.ingress[0].ip}'\n",
    "SCHEDULER_IP=SCHEDULER_IP[0]\n",
    "import os\n",
    "os.environ['SCHEDULER_IP'] = SCHEDULER_IP\n",
    "SCHEDULER_IP"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fantastic-packaging",
   "metadata": {},
   "outputs": [],
   "source": [
    "MESH_IP=!kubectl get svc seldon-mesh -n seldon-mesh -o jsonpath='{.status.loadBalancer.ingress[0].ip}'\n",
    "MESH_IP=MESH_IP[0]\n",
    "import os\n",
    "os.environ['MESH_IP'] = MESH_IP\n",
    "MESH_IP"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "smoking-recycling",
   "metadata": {},
   "outputs": [],
   "source": [
    "!grpcurl -d '{\"model\":{ \\\n",
    "              \"meta\":{\"name\":\"iris\"},\\\n",
    "              \"modelSpec\":{\"uri\":\"gs://seldon-models/mlserver/iris\",\\\n",
    "                           \"requirements\":[\"sklearn\"],\\\n",
    "                           \"memoryBytes\":500},\\\n",
    "              \"deploymentSpec\":{\"replicas\":1},\\\n",
    "              \"streamSpec\":{\"inputTopic\":\"iris-in\",\"outputTopic\":\"iris-out\"}}}' \\\n",
    "         -plaintext \\\n",
    "         -import-path ../../apis \\\n",
    "         -proto ../../apis/mlops/scheduler/scheduler.proto  ${SCHEDULER_IP}:9004 seldon.mlops.scheduler.Scheduler/LoadModel"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "round-pressing",
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "!grpcurl -d '{\"name\":\"mlserver\"}' \\\n",
    "         -plaintext \\\n",
    "         -proto ../apis/mlops/scheduler/scheduler.proto  ${SCHEDULER_IP}:9004 seldon.mlops.scheduler.Scheduler/ServerStatus"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "enhanced-cosmetic",
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "!curl -v http://${MESH_IP}:80/v2/models/iris/infer -H \"Content-Type: application/json\" -H \"seldon-model: iris\"\\\n",
    "        -d '{\"inputs\": [{\"name\": \"predict\", \"shape\": [1, 4], \"datatype\": \"FP32\", \"data\": [[1, 2, 3, 4]]}]}'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "judicial-bouquet",
   "metadata": {},
   "outputs": [],
   "source": [
    "produceIrisJson()\n",
    "consumeIrisJson()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "experienced-verification",
   "metadata": {},
   "outputs": [],
   "source": [
    "produceIrisGrpc()\n",
    "consumeIrisGrpc()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "toxic-filling",
   "metadata": {},
   "outputs": [],
   "source": [
    "!grpcurl -d '{\"model\":{\"name\":\"iris\"}}' \\\n",
    "         -plaintext \\\n",
    "         -import-path ../../apis \\\n",
    "         -proto ../../apis/mlops/scheduler/scheduler.proto  ${SCHEDULER_IP}:9004 seldon.mlops.scheduler.Scheduler/UnloadModel"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "exclusive-cooper",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
